Part 1, Read transforms via expressions: Just compute the expression and return it.#607
Part 1, Read transforms via expressions: Just compute the expression and return it.#607nicklan merged 31 commits intodelta-io:mainfrom
Conversation
| /// be processed to complete the scan. Non-selected rows _must_ be ignored. The boolean flag | ||
| /// indicates whether the record batch is a log or checkpoint batch. | ||
| pub fn scan_action_iter( | ||
| pub(crate) fn scan_action_iter( |
There was a problem hiding this comment.
Note this is a significant change as we not longer expose this function. In discussion so far we've agreed that it basically should never have been pub, and I just made a mistake when doing so. An engine should call scan_data which mostly just proxies to this, but doesn't expose internal details to the engine.
Open to discussion though.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #607 +/- ##
==========================================
+ Coverage 84.00% 84.05% +0.05%
==========================================
Files 75 75
Lines 17097 17251 +154
Branches 17097 17251 +154
==========================================
+ Hits 14363 14501 +138
- Misses 2045 2050 +5
- Partials 689 700 +11 ☔ View full report in Codecov by Sentry. |
scovich
left a comment
There was a problem hiding this comment.
When parsing the Add file, if there are needed fix-ups (just partition columns today), the correct expression is created, and inserted into a row indexed map
Why do we need a map here? It seems like we either have a fixup for every row, or for no rows? Just apply the fixup conditionally if we see a non-empty vec of fixups?
| val.ok_or_else(|| { | ||
| Error::MissingData(format!("Data missing for field {field_name}")).with_backtrace() | ||
| }) |
There was a problem hiding this comment.
intentional/permanent change? Or just for debugging?
There was a problem hiding this comment.
intentional, since this error occurs in more than one place
There was a problem hiding this comment.
aside: I wonder if we should start adding some kind of "location code" as a (much) cheaper alternative to backtraces, that also stays stable as the code base evolves around it?
There was a problem hiding this comment.
Yeah, that could work. I'm not too worried about perf for backtraces as they should only appear in error cases though
kernel/src/scan/log_replay.rs
Outdated
| let have_seen = self.check_and_record_seen(file_key); | ||
| if is_add && !have_seen { | ||
| // compute transform here | ||
| if let Some(ref transform) = self.transform { |
There was a problem hiding this comment.
Agree on both nesting and rule of 30 here.
Also, this code is redundant:
let have_seen = self.check_and_record_seen(file_key);
if is_add && !have_seen {
... do stuff ...
}
Ok(is_add && !have_seen)The early return would make very clear what's going on:
if !is_add || have_seen {
return Ok(false);
}
... do stuff ...
Ok(true)
kernel/src/scan/log_replay.rs
Outdated
| let have_seen = self.check_and_record_seen(file_key); | ||
| if is_add && !have_seen { |
There was a problem hiding this comment.
Related to #615:
Data skipping runs before this visitor, which means we can't use the partition values for data skipping in its current form.
How should we proceed? Even if we run a partition value extraction visitor before data skipping, that builds a hash map of parsed partition value literals (instead of embedding them in a struct expression), we still can't use the normal data skipping expression machinery. We'd almost need the row visitor itself to apply partition pruning, using a DefaultPredicateEvaluator that sits on top of the partition values map. The (big) downside of that approach is it won't reliably handle predicates that mix references to partition columns and normal columns, e.g. the following predicate would have no data skipping at all, because both predicate evaluators would reject the OR due to a missing leg:
WHERE partition_col1 = 10 OR value_col2 = 20It would at least handle top-level AND gracefully, tho:
WHERE partition_col1 = 10 AND value_col2 = 20(because each predicate evaluator would work with the subset of the AND it understands)
There was a problem hiding this comment.
Even if we run a partition value extraction visitor before data skipping, that builds a hash map of parsed partition value literals (instead of embedding them in a struct expression), we still can't use the normal data skipping expression machinery.
Could you explain why we can't use the normal data skipping expression machinery? Current data skipping reads the stats field of add actions. I imagine we could use a visitor to extract the partition values along with the stats, then write back the stats field with updated values. Then data skipping proceeds as normal. idk if this is perhaps expensive, but I think it'll be important to be able to do data skipping on predicates with mixed references.
There was a problem hiding this comment.
We definitely want the effect of data skipping, one way or another. I just meant that today's data skipping flow happens before the row visitor that could extract and parse partition values.
Either we need to add a second visitor that runs first and updates the stats column, or we apply partition skipping as a completely separate step (that could run before or after normal data skipping). Updating the stats column has several disadvantages:
- Needs a separate visitor pass (runtime cost)
- We don't currently have any API for updating an
EngineData(we only have expression eval). We know we need to eventually add such capability, but we don't have it yet. - Stats-based pruning code isn't a great fit for partition values, because it wouldn't support nullcount based pruning and min/max based pruning is needlessly complex when always min=max for partition values.
That makes me wonder if we should apply partition pruning after stats-based pruning, as part of the existing row visitor that already filters out previously seen files:
- Parse partition values into a
HashMap<ColumnName, Scalar>, which already has#[cfg(test)] impl ResolveColumnAsScalarin predicates/mod.rs (just need to remove the feature flag from it). - Wrap a
DefaultPredicateEvaluatoraround the partition values hashmap, and evaluate it.
There was a problem hiding this comment.
Aha that makes sense. So move it till later to avoid complicating the existing data skipping and avoiding the runtime cost.
There was a problem hiding this comment.
As for mixed references -- it will work for a majority of cases, because most partition predicates are simple top-level conjuncts, like this:
WHERE partition_col1 = 10 AND value_col2 = 20The partition pruning code would handle the first conjunct (ignoring the second), and stats pruning code would handle the second conjunct (ignoring the first). This is actually how Delta-spark does it today.
There was a problem hiding this comment.
Seems like being out was a great way for me to get this resolved :)
In seriousness though, that suggestion makes sense. We can let the existing flow prune via stats, and then just run the predicate evaluator over the extracted hashmap in the visitor, which can modify its already existing selection vector to prune files where the partition doesn't match.
wrt. to this PR, I think the code flow then still makes sense, and we can take partition pruning as a follow-up?
OussamaSaoudi-db
left a comment
There was a problem hiding this comment.
looks good to me 👍
kernel/src/scan/log_replay.rs
Outdated
| get_state_info, | ||
| state::{DvInfo, Stats}, | ||
| test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback}, | ||
| test_utils::{ |
There was a problem hiding this comment.
nit: we can perhaps flatten these imports.
It's not the same for every row. The file could have a different value for the partition column for instance, or in the future each file could need the variant cols fixed up differently. Perhaps you meant we could just have a |
I think we have three potential cases:
Vec is definitely good for 1/ and 2/, and I estimate that Vec will also work Just Fine for 3/, given the batch sizes we expect to encounter in practice. Rationale:
|
Following discussion, agree a |
scovich
left a comment
There was a problem hiding this comment.
Couple nits to fix before merge, but otherwise LGTM
| val.ok_or_else(|| { | ||
| Error::MissingData(format!("Data missing for field {field_name}")).with_backtrace() | ||
| }) |
There was a problem hiding this comment.
aside: I wonder if we should start adding some kind of "location code" as a (much) cheaper alternative to backtraces, that also stays stable as the code base evolves around it?
kernel/src/scan/log_replay.rs
Outdated
| )); | ||
| }; | ||
| let name = field.physical_name(); | ||
| let value_expression = super::parse_partition_value( |
There was a problem hiding this comment.
aside: Looking at #624, I wonder if there's a (worthwhile) way to parse partition values only once per file action? But partition pruning and data fixup happen so far apart that I suspect it would be simpler (and maybe even cheaper) to parse a second time rather than try to build up and track a big side collection of parsed partition values.
It would perhaps be a different story if we had a clean way to convert partition values from string-string map to parsed struct using expressions, because then the partition values would be conveniently embedded in the log replay engine data. But I don't see that happening any time soon, given how much effort it would take to add map and string parsing expression support.
There was a problem hiding this comment.
Yeah, this is a good point. Depending on how we merge things, we should consider looking at it when the second of this or #624 go in
kernel/src/scan/log_replay.rs
Outdated
| partition_values.get(name), | ||
| field.data_type(), | ||
| )?; | ||
| Ok(value_expression.into()) |
There was a problem hiding this comment.
Technically this isn't an expression (yet). Maybe better to call it partition_value (scalar), which then gets converted into a (literal) expression?
| Ok(value_expression.into()) | |
| Ok(partition_value.into()) |
kernel/src/scan/log_replay.rs
Outdated
| let have_seen = self.check_and_record_seen(file_key); | ||
| if !is_add || have_seen { | ||
| return Ok(false); | ||
| } |
There was a problem hiding this comment.
This seems like a dangerous change (because somebody trying to "optimize" the code might produce control flow that skips non-adds without checking them first). Now that you no longer need the have_seen multiple times, can we partially revert so it matches the code comment at L142 again?
| let have_seen = self.check_and_record_seen(file_key); | |
| if !is_add || have_seen { | |
| return Ok(false); | |
| } | |
| if self.check_and_record_seen(file_key) || !is_add { | |
| return Ok(false); | |
| } |
There was a problem hiding this comment.
Actually, we should probably update the comment to match the new code:
// Check both adds and removes (skipping already-seen), but only transform and return adds
kernel/src/scan/log_replay.rs
Outdated
|
|
||
| fn validate_transform(transform: Option<&ExpressionRef>, expected_date_offset: i32) { | ||
| assert!(transform.is_some()); | ||
| if let Expression::Struct(inner) = transform.unwrap().as_ref() { |
There was a problem hiding this comment.
Seems like a good place for let-else matching?
let Expression::Struct(inner) = transform.unwrap().as_ref() else {
panic!("Transform should always be a struct expr");
};
assert_eq!(...);
let Expression::Column(ref name) = inner[0] else {
panic!("Expected first expression to be a column");
};
assert_eq!(...);
let Expression::Literal(ref scalar) = inner[1] else {
panic!("Expected second expression to be a literal");
};
assert_eq!(...);(less indentation => more readable)
kernel/src/scan/mod.rs
Outdated
| /// the query. NB: If you are using the default engine and plan to call arrow's | ||
| /// `filter_record_batch`, you _need_ to extend this vector to the full length of the batch or | ||
| /// arrow will drop the extra rows. | ||
| /// - `HashMap<usize, Expression>`: Transformation expressions that need to be applied. For each |
There was a problem hiding this comment.
I think it's using a Vec now?
There was a problem hiding this comment.
nice catch. updated and updated the description
kernel/src/scan/mod.rs
Outdated
| // Compute the static part of the transformation. This is `None` if no transformation is | ||
| // needed (currently just means no partition cols, but will be extended for other transforms | ||
| // as we support them) |
There was a problem hiding this comment.
The comment doesn't reference column mapping? Should it?
Also, what other kind of transform might there be, besides "static" referenced here?
There was a problem hiding this comment.
"Other transforms" means future things we may need to apply transforms for. So, variant decoding for example. If something needed variant decoding then the static_transform would not be None.
zachschuermann
left a comment
There was a problem hiding this comment.
looks great mostly just questions as I got caught up on everything! I guess my only concern is the row-based nature which in the worst case would have an expression for every row? if we had the notion of stateful KDF's I wonder if that would remove the need to create an expression for every row?
|
|
||
| pub type ScanData = (Box<dyn EngineData>, Vec<bool>); | ||
| /// A transform is ultimately a `Struct` expr. This holds the set of expressions that make that struct expr up | ||
| type Transform = Vec<TransformExpr>; |
There was a problem hiding this comment.
(idea for future) maybe worth introducing scan/transforms.rs for a transforms module?
There was a problem hiding this comment.
also qq how does this actually become struct? just since you use that to colocate all the different exprs in the vec?
There was a problem hiding this comment.
Yeah, not a bad idea on the module. I'll consider it when I handle merging this with #624.
A TransformExpr can either be a static "just select this col", or something that actually requires a transform, like "this is a partition col". In the log-replay code we iterate over this, turn the static "select cols" into Column expressions, and fill in the actual proper expressions for the others, which leaves a Vec<Expr> which we make a struct expr.
There was a problem hiding this comment.
ah yep got it thanks!
| // for other transforms as we support them) | ||
| let static_transform = (self.have_partition_cols | ||
| || self.snapshot.column_mapping_mode != ColumnMappingMode::None) | ||
| .then_some(Arc::new(Scan::get_static_transform(&self.all_fields))); |
There was a problem hiding this comment.
is this desirable over self.get_static_transform() sort of API just so we can recycle/use it elsewhere in the future?
There was a problem hiding this comment.
It would be, but to do that the tests for this functionality would always have to construct a snapshot, which is... hard at the moment. Perhaps when we switch to ResolvedTable we can simplify this
| /// Transforms aren't computed all at once. So static ones can just go straight to `Expression`, but | ||
| /// things like partition columns need to filled in. This enum holds an expression that's part of a | ||
| /// `Transform`. | ||
| pub(crate) enum TransformExpr { |
There was a problem hiding this comment.
wonder if we we should consider something to communicate that this is some sort of unresolved/partial transform. might be useful to call out that the static part is already resolved as an actual Expr for transform but then partition requires resolution to actually deduce the expr (i realize this is comment above haha but took me a little to catch on so wondering if we could optimize naming)
There was a problem hiding this comment.
Hopefully people read the comment :) I don't really want to bloat the name
kernel/src/scan/log_replay.rs
Outdated
| let field = self.logical_schema.fields.get_index(*field_idx); | ||
| let Some((_, field)) = field else { | ||
| return Err(Error::generic( | ||
| "logical schema did not contain expected field, can't transform data", |
There was a problem hiding this comment.
mega nit but might be useful to log the index?
Minor change: Propagate the computed transforms from #607 through calls to `visit_scan_files`.
Use the transform that has been computed (see #607 and #612) rather than using `transform_to_logical`. 1. Remove `column_mapping_mode` from `GlobalScanState` (it's not needed there anymore) 2. Remove the old `transform_to_logical` code 3. Add a new `scan::state::transform_to_logical` function that encapsulates the boilerplate of applying the transform expression 4. Use the new function where needed. Existing tests pass which test this functionality extensively.
What changes are proposed in this pull request?
This is the initial part of moving to using expressions to express transformations when reading data. What this PR does is:
AddfileAddfile, if there are needed fix-ups (just partition columns today), the correct expression is created, and inserted into a row indexed mapFollow-up PRs:
visit_scan_filestransform_to_logicalentirelyEach of those are more invasive and end up touching significant code, so I'm staging this as much as possible to make reviews easier.
How was this change tested?
Unit tests, and inspection of resultant expressions when run on tables